#添加此代码
import findspark
findspark.init()

from pyspark import since, SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import HiveContext as hc

def write_to_mysql(df, table):
    # 定义 MySQL 连接属性
    mysql_url = "jdbc:mysql://localhost:3306/aqy"
    mysql_properties = {
        "user": "root",
        "password": "123456",
        "driver": "com.mysql.jdbc.Driver"
    }

    # 将 DataFrame 中的数据保存到 MySQL 数据库中的一个表中
    df.write.jdbc(url=mysql_url, table=table, mode="overwrite", properties=mysql_properties)


spark = SparkSession.builder.appName("example-spark").config("spark.sql.crossJoin.enabled", "true").config(
    'spark.rpc.message.maxSize', '1536').enableHiveSupport().getOrCreate()

source = spark.read.csv("data.csv", header="true", inferSchema=
True, sep=',')



source.createTempView("source1")

source = spark.sql("select order,desc,showDate,tag,creator,contributor,firstChildBossStatusEnum,dq_updatestatus,bossStatus,tag_pcw,title,hot_score,year,month,score_per"
          " from source1")

source.createTempView("source")
# 创建表
spark.sql("""
CREATE TABLE if not exists aqy (
    `order` INT,
    `desc` STRING,
    `showDate` DATE,
    `tag` STRING,
    `creator` STRING,
    `contributor` STRING,
    `firstChildBossStatusEnum` STRING,
    `dq_updatestatus` STRING,
    `bossStatus` STRING,
    `tag_pcw` STRING,
    `title` STRING,
    `hot_score` INT,
          `year` INT,
     `month` INT,
    `score_per` DOUBLE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

""")

# 第一次触发
# spark.sql("insert into aqy select * from source ")

filter_data = spark.sql("select * from aqy where hot_score is not null and title is not null")
filter_data.show()

filter_data.createTempView("v1")


# 分析数据
# 年份
year_data = spark.sql("select `year`, count(1) as year_num, round(avg(hot_score),2) avg_hot from v1 group by year order by `year` desc")
year_data.show()
# 月份
month_data = spark.sql("select `month`, count(1) as month_num, round(avg(hot_score),2) avg_hot from v1 group by month order by `month`")
month_data.show()

# vip,not vip
status_data = spark.sql("select bossStatus, count(1) as status, round(avg(hot_score),2) as avg_hot from v1 group by bossStatus")
status_data.show()

# 电视剧类型
tag_exploded =  spark.sql("""
    SELECT 
           exploded_tag AS tag,
    
           score_per
    FROM (
        SELECT *,
               exploded_tag
        FROM v1
        LATERAL VIEW EXPLODE(SPLIT(tag, ';')) AS exploded_tag
    )
""")
tag_exploded.createTempView('tag')
tag_data = spark.sql("select tag,count(1) tag_num, round(avg(score_per),2) avg_per from tag group by tag")
tag_data.show()

# 演员
contributor = spark.sql("""
    SELECT 
           exploded_contributor AS contributor,

           score_per
    FROM (
        SELECT *,
               contributor
        FROM v1
        LATERAL VIEW EXPLODE(SPLIT(contributor, '，')) AS exploded_contributor
    )
""")

contributor.createTempView("contributor")
contributor = spark.sql("select contributor, count(1) contributor_num, round(avg(score_per), 2) per_avg from contributor group by contributor order by contributor_num desc limit 20")
contributor.show()
# 电视剧语言类型 普通话
tag_pcw = spark.sql("""
    SELECT 
           exploded_tag_pcw AS tag_pcw,

           score_per
    FROM (
        SELECT *,
               tag_pcw
        FROM v1
        LATERAL VIEW EXPLODE(SPLIT(tag_pcw, ';')) AS exploded_tag_pcw
    )
""")
tag_pcw.createTempView("tag_pcw")
pcw = spark.sql("select tag_pcw, count(1) pcw_num, round(avg(score_per),2) per_avg from tag_pcw"
          " where tag_pcw in ('普通话', '港剧') group by tag_pcw")
pcw.show()

print("Prepare to insert data to mysql....")
# 将结果数据 入 mysql
write_to_mysql(year_data, "years")
write_to_mysql(month_data, "month_data")
write_to_mysql(tag_data, "tag")
write_to_mysql(status_data, "vip_status")
write_to_mysql(contributor, "contributor")
write_to_mysql(pcw, "pcw")